{
 "metadata": {
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.6-final"
  },
  "orig_nbformat": 2,
  "kernelspec": {
   "name": "python3",
   "display_name": "Python 3",
   "language": "python"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2,
 "cells": [
  {
   "source": [
    "# 用训练好的模型对test_format1进行预测"
   ],
   "cell_type": "markdown",
   "metadata": {}
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "import pyspark\n",
    "from pyspark.sql import SparkSession\n",
    "from pyspark import SparkContext\n",
    "from pyspark import SparkConf\n",
    "from pyspark.mllib.util import MLUtils\n",
    "from pyspark.mllib.linalg import Vectors"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "conf=SparkConf().setAppName(\"miniProject\").setMaster(\"local\").set(\"spark.executor.memory\",\"3g\")\\\n",
    "        .set(\"spark.executor.instances\",\"2\")\n",
    "sc=SparkContext.getOrCreate(conf)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "#导入测试集\n",
    "spark = SparkSession\\\n",
    "        .builder\\\n",
    "        .master(\"local\")\\\n",
    "        .appName(\"DataRead\")\\\n",
    "        .getOrCreate()\n",
    "test_data = spark.read.csv(r\"hdfs://node1:9000/user/root/exp4/procd_test_real.csv\", encoding='utf8', header=True, inferSchema=True) \n",
    "test_data = test_data.rdd"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "#将测试集的特征转为向量\n",
    "test = test_data.map(lambda line: (line[0],line[1],line[2],Vectors.dense(line[3:])))"
   ]
  },
  {
   "source": [
    "## Logistic Regression"
   ],
   "cell_type": "markdown",
   "metadata": {}
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.mllib.classification import LogisticRegressionModel\n",
    "lr_model = LogisticRegressionModel.load(sc, \"hdfs://node1:9000/user/root/exp4/models/LogisticRegressionModel\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {},
   "outputs": [],
   "source": [
    "lr_predictions = test.map(lambda line: (line[0],line[1],float(lr_model.predict(line[3]))))\n",
    "lr_predictions.coalesce(1).toDF().write.options(header=\"true\").csv(\"hdfs://node1:9000/user/root/exp4/predictions/lr_predictions.csv\")"
   ]
  },
  {
   "source": [
    "日期:2020-12-20 14:08:52 排名: 无\n",
    "score:0.5015744"
   ],
   "cell_type": "markdown",
   "metadata": {}
  },
  {
   "source": [
    "## SVM"
   ],
   "cell_type": "markdown",
   "metadata": {}
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.mllib.classification import SVMModel\n",
    "svm_model = SVMModel.load(sc, \"hdfs://node1:9000/user/root/exp4/models/SVMWithSGDModel\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {},
   "outputs": [],
   "source": [
    "svm_predictions = test.map(lambda line: (line[0],line[1],float(svm_model.predict(line[3]))))\n",
    "svm_predictions.coalesce(1).toDF().write.options(header=\"true\").csv(\"hdfs://node1:9000/user/root/exp4/predictions/svm_predictions.csv\")"
   ]
  },
  {
   "source": [
    "日期:2020-12-20 14:18:59 排名: 无\n",
    "score:0.5156678"
   ],
   "cell_type": "markdown",
   "metadata": {}
  },
  {
   "source": [
    "## Gradient Boosted Trees"
   ],
   "cell_type": "markdown",
   "metadata": {}
  },
  {
   "cell_type": "code",
   "execution_count": 35,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.mllib.tree import GradientBoostedTreesModel\n",
    "GBDT_model = GradientBoostedTreesModel.load(sc, \"hdfs://node1:9000/user/root/exp4/models/myGradientBoostingClassificationModel\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 36,
   "metadata": {},
   "outputs": [],
   "source": [
    "predictions = GBDT_model.predict(test.map(lambda x: x[3]))\n",
    "GBDT_predictions = test.map(lambda lp: (lp[0],lp[1])).zip(predictions).map(lambda lp:(lp[0][0],lp[0][1],lp[1]))\n",
    "GBDT_predictions.coalesce(1).toDF().write.options(header=\"true\").csv(\"hdfs://node1:9000/user/root/exp4/predictions/GBDT_predictions.csv\")\n",
    "\n",
    "# GBDT_predictions = test.map(lambda line: (line[0],line[1],float(GBDT_model.predict(line[3]))))\n",
    "# GBDT_predictions.coalesce(1).toDF().write.options(header=\"true\").csv(\"hdfs://node1:9000/user/root/exp4/predictions/GBDT_predictions.csv\")"
   ]
  },
  {
   "source": [
    "日期:2020-12-20 14:51:00 排名: 无\n",
    "score:0.5000562"
   ],
   "cell_type": "markdown",
   "metadata": {}
  },
  {
   "source": [
    "## SVM with Normalized data"
   ],
   "cell_type": "markdown",
   "metadata": {}
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.mllib.classification import SVMModel\n",
    "svm_model2 = SVMModel.load(sc, \"hdfs://node1:9000/user/root/exp4/models/NormalizedSVMWithSGDModel\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [],
   "source": [
    "#将数据规范化\n",
    "from pyspark.mllib.feature import Normalizer\n",
    "features = test.map(lambda x: x[3])\n",
    "normalizer = Normalizer()\n",
    "normalized_test = test.map(lambda lp: (lp[0],lp[1])).zip(normalizer.transform(features)).map(lambda lp:(lp[0][0],lp[0][1],lp[1]))\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [
    {
     "output_type": "stream",
     "name": "stdout",
     "text": [
      "[(1461, 2775, DenseVector([0.6124, 0.2041, 0.6124, 0.2041, 0.2041, 0.2041, 0.2041, 0.0, 0.2041, 0.0])), (1783, 4864, DenseVector([0.2298, 0.0, 0.8043, 0.517, 0.1149, 0.1149, 0.0574, 0.0, 0.0574, 0.0])), (1969, 4186, DenseVector([0.4682, 0.117, 0.7022, 0.4682, 0.117, 0.117, 0.117, 0.0, 0.117, 0.0])), (2044, 4491, DenseVector([0.0, 0.0, 0.937, 0.2499, 0.0625, 0.1562, 0.1249, 0.0, 0.1249, 0.0])), (2163, 2995, DenseVector([0.0889, 0.0, 0.9181, 0.2962, 0.1777, 0.1185, 0.0889, 0.0, 0.0889, 0.0])), (2194, 2459, DenseVector([0.1627, 0.0542, 0.8677, 0.3254, 0.2169, 0.2169, 0.1085, 0.0, 0.0542, 0.0542])), (2406, 4775, DenseVector([0.7171, 0.1195, 0.5976, 0.1195, 0.1195, 0.239, 0.1195, 0.0, 0.1195, 0.0])), (2494, 3236, DenseVector([0.8528, 0.0, 0.2132, 0.2132, 0.2132, 0.2132, 0.2132, 0.0, 0.2132, 0.0])), (2756, 2669, DenseVector([0.0, 0.1091, 0.6547, 0.5455, 0.4364, 0.2182, 0.1091, 0.0, 0.1091, 0.0])), (3466, 1892, DenseVector([0.5721, 0.0, 0.3814, 0.3814, 0.286, 0.0953, 0.3814, 0.0, 0.3814, 0.0]))]\n"
     ]
    }
   ],
   "source": [
    "print(normalized_test.take(10))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [],
   "source": [
    "svm_predictions2 = normalized_test.map(lambda line: (line[0],line[1],float(svm_model2.predict(line[2]))))\n",
    "svm_predictions2.coalesce(1).toDF().write.options(header=\"true\").csv(\"hdfs://node1:9000/user/root/exp4/predictions/svm_predictions2.csv\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [],
   "source": [
    "\n",
    "spark.stop()\n",
    "sc.stop()"
   ]
  },
  {
   "source": [
    "日期:2020-12-20 15:06:01 排名: 无\n",
    "score:0.5000000"
   ],
   "cell_type": "markdown",
   "metadata": {}
  }
 ]
}